package com.daoshu.socket.handler.netty;

import com.daoshu.socket.config.WebsocketConfig;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;

/**
 * @ClassName: WebSocketFrameHandler
 * @description: WebScoket处理
 * @author: Allen
 * @create: 2019-06-17 09:08
 **/
@Slf4j
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    private WebsocketConfig nettyConfig;

    public TextWebSocketFrameHandler(WebsocketConfig nettyConfig) {
        this.nettyConfig = nettyConfig;
    }

    /****
     * 用户连接组
     */
    static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        Channel channel = ctx.channel();
        log.debug("IP:===>>>{} , body ===>>>{}", channel.remoteAddress(), msg.text());

        ctx.channel().writeAndFlush(new TextWebSocketFrame("【自己】" + msg.text() + "\n"));
        for (Channel ch : channelGroup) {
            if (ch != ctx.channel()) {
                ch.writeAndFlush(new TextWebSocketFrame(ctx.channel().remoteAddress() + " 发送的消息" + msg.text() + "\r\n"));
            }
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("IP:" + ctx.channel().remoteAddress() + " 上线");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("IP:" + ctx.channel().remoteAddress() + " 下线");
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        log.info("IP:" + ctx.channel().remoteAddress() + " 加入");
        //将会通知到channelGroup里面的所有的channel
        channelGroup.writeAndFlush("【服务器消息】-" + channel.remoteAddress() + "加入\n");
        //将channel放入channelGroup中
        channelGroup.add(channel);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        log.info("IP:" + ctx.channel().remoteAddress() + " 移除");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.info("WebSocketFrameHandler err:" + cause.getMessage());
    }

    private int lossConnectCount = 0;

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            //读监听
            if (event.state() == IdleState.READER_IDLE) {
                lossConnectCount++;
                if (lossConnectCount > nettyConfig.getLossConnectCount()) {
                    log.info("关闭这个不活跃通道！");
                    ctx.channel().close();
                }
            }

        }
    }
}

